Golang sync 包解析

包的用途

主要用于在并发中(同步机制,原子操作和对象池)

原子操作 atomic.Value

使用的是汇编语言编写,在sync/atomic包中,
在sync.atomic包中,有两个函数,Value.Store()Value.Load()用于对内存中的数据进行原子的读和写,sync中的很多操作都是采用这样的方式对数据的于原子操作

Map 并发Map(1.9更新)

1
2
3
4
5
6
type Map struct {
mu Mutex
read atomic.Value // readOnly
dirty map[interface{}]*entry
misses int
}

方法

1
2
3
4
5
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {} //读取map中的值,ok反应是否存在key
func (m *Map) Store(key, value interface{}) {} //原子操作存取数据到map中
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {}
func (m *Map) Delete(key interface{}) {} //删除map中的key
func (m *Map) Range(f func(key, value interface{}) bool) {} //遍历sync.map中的数据

  • 在Go 1.9中sync.Map是怎么实现的呢?它是如何解决并发提升性能的呢?
  • sync.Map的实现有几个优化点,这里先列出来,我们后面慢慢分析。
  • 空间换时间。 通过冗余的两个数据结构(read、dirty),实现加锁对性能的影响。
  • 使用只读数据(read),避免读写冲突。
  • 动态调整,miss次数多了之后,将dirty数据提升为read。
  • double-checking。
  • 延迟删除。 删除一个键值只是打标记,只有在提升dirty的时候才清理删除的数据。
  • 优先从read读取、更新、删除,因为对read的读取不需要锁。

Mutex/RWMutex 读写锁

  • Mutex

    1
    2
    3
    4
    type Mutex struct {
    state int32
    sema uint32
    }

    Lock()方法

    * cas操作修改锁状态到锁定,如果锁是锁定状态,不能修改
    * 则会进入自旋锁,忙等待锁的Unlock,由于自旋锁是有限制的,当到达上线后
    * 就会通过信号量,收到信号量后才能被唤醒,锁定
    

    Unlock()方法

    * case操作修改锁状态到解锁,并且释放信号量
    
  • RWMutex
    1
    2
    3
    4
    5
    6
    7
    type RWMutex struct {
    w Mutex // held if there are pending writers
    writerSem uint32 // semaphore for writers to wait for completing readers
    readerSem uint32 // semaphore for readers to wait for completing writers
    readerCount int32 // number of pending readers
    readerWait int32 // number of departing readers
    }

对象中有读(readerSem)和写(writerSem)两个信号量,相比Mutex,没了自旋锁的,直接获取信号量

ps:RWMutex主要应用在读多写少的场景中

Cond

1
2
3
4
5
6
type Cond struct {
noCopy noCopy //等待计数器
L Locker //锁
notify notifyList //信号量
checker copyChecker
}
  • 在等待一个goruntime执行完成后让使得另一个goruntime执行的话就可以使用cond这样的对象
    usage:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    package main
    import (
    "fmt"
    "sync"
    "time"
    )
    func main() {
    cond := sync.NewCond(new(sync.Mutex))
    go func(cd *sync.Cond) {
    cd.L.Lock()
    cd.Wait()
    fmt.Println("after single...")
    cd.L.Unlock()
    }(cond)

    time.Sleep(time.Second * 2)
    cond.L.Lock()
    cond.Signal()
    fmt.Println("single...")
    cond.L.Unlock()
    time.Sleep(time.Second * 2)
    }

WaitGroup

1
2
3
4
5
type WaitGroup struct {
noCopy noCopy
state1 [12]byte //计数器
sema uint32
}
  • 在多个goruntime执行完成后才能去执行操作使用
  • 必须等上一次使用中所有的Wait均已返回才行
  • 知道计数器值为0,才往下执行
    usage:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    package main
    import (
    "fmt"
    "sync"
    "time"
    )
    func main() {
    wg := new(sync.WaitGroup)
    for i := 0; i < 5; i++ {
    go func(i int) {
    wg.Add(1)
    defer wg.Done()
    time.Sleep(time.Second * 2)
    fmt.Println("exec %d goruntime...", i)
    }(i)
    }
    time.Sleep(time.Second)
    wg.Wait()
    fmt.Println("after all goruntime exec over....")
    }

Once 判断函数是否执行(原子操作)

1
2
3
4
type Once struct {
m Mutex
done uint32
}
  • 如果函数是已经执行的函数,原子判断是否为done如果是的话,就会返回
  • 如果没有执行过,就会加锁后,对该函数进行执行(原子操作)

usage:

1
2
3
4
5
6
7
8
9
10
11
12
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 1 {
return
}
// Slow-path.
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}

0%